package com.superid.clients;

import com.superid.config.KafkaCommonConfig;
import com.superid.config.KafkaProducerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
 * @author dufeng
 * @create: 2018-08-08 15:54
 */
@Component
public class AvroKafkaProducer implements InitializingBean {

    private static final Logger logger = LoggerFactory.getLogger(AvroKafkaProducer.class);
    @Autowired
    private KafkaCommonConfig commonConfig;

    @Autowired
    private KafkaProducerConfig producerConfig;

    KafkaProducer producer;

    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("kafka config= "+commonConfig);
        logger.info("kafka producer config="+producerConfig);

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,commonConfig.getBootstrapServers() );
        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, commonConfig.getSchemaRegistryUrl());
        props.put(ProducerConfig.ACKS_CONFIG, producerConfig.getAcks());
        props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.getRetries());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.getBatchSize());
        props.put(ProducerConfig.LINGER_MS_CONFIG, producerConfig.getLingerMs());
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, producerConfig.isEnableIdempotence());
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, producerConfig.getMaxInFlightRequestsPerConnection());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerConfig.getBufferMemory());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        producer = new KafkaProducer(props);
    }

    public void send(String topic, GenericRecord msg) {
        producer.send(new ProducerRecord(topic, msg));
    }

    public void send(String topic, int key, GenericRecord msg) {
        producer.send(new ProducerRecord(topic, key, msg));
    }
}
